
/*
窗口累加器的Logical表示，以and、or等方式形成复杂组合
*/

use crate::conf_init::{MapResult};
use crate::logical_plan::expr::partition_by::{LogicalPlanExprPartitionType, parse_partition_by};
use sapeur::logical_comb::LogicalCombOutput;
use crate::logical_plan::work_node::conf::WindowConf;
use crate::logical_plan::expr::func::{LogicalPlanExprFuncOrValue, conf_parse_logical_expr_func};
use crate::logical_plan::expr::accumulator::{LogicalPlanExprAccumulator, conf_parse_logical_expr_accumulator_comb};

#[derive(Debug)]
pub struct LogicalPlanExprWindow{
    pub name: String,
    //window partition_by 的几种灵活配置场景，支持对多元素处理的基础上进行组合partition
    // (a,b,c) (d,e,f) element_func((h,i,j),5)
    pub partition_by: Option<LogicalPlanExprPartitionType>,
    /*
        窗口累加器，由于窗口累加器也支持嵌套组合形成更复杂的方式，
        所以也要有Deparated或Delimited
        先暂定配置格式为:
        单个：count(gt,5)
        组合 (count(gt,5) && count(lt,10)) 个数大于5，小于10
        累加器支持两种类型的参数，第一种是用于表达需求的配置参数RealConfValue，
                             第二种是用于累加器基于数据灵活运算的数据参数PhysicalPlanExpr
         基于以上，累加器配置格式暂定为 accumulator_name(RealConfValue,...,RealConfValue;LogicalPlanExprFunc,...LogicalPlanExprFunc)
                 累加器的两种参数均可以为空。
    */
    pub accumulator: LogicalCombOutput<LogicalPlanExprAccumulator>,
    pub sequential: LogicalPlanExprFuncOrValue,
    pub uniqueness: LogicalPlanExprFuncOrValue,
    pub window_time: u64,
    pub slide_time: u64,
    pub wait_time: u64,
    pub time_unit: TimeUnit,
}

pub fn conf_map_to_window(conf: &WindowConf)->MapResult<LogicalPlanExprWindow>{
    let partition_by = if let Some((order,desc)) = &conf.partition_by{
        Some(parse_partition_by(*order, desc.as_str())?)
    }else{
        None
    };

    let accumulator = conf_parse_logical_expr_accumulator_comb(conf.accumulator.as_str())?;
    let sequential = conf_parse_logical_expr_func(conf.sequential.as_str())?;
    let uniqueness = conf_parse_logical_expr_func(conf.uniqueness.as_str())?;

    return Ok(
        LogicalPlanExprWindow{
            name: conf.name.clone(),
            partition_by: partition_by,
            accumulator: accumulator,
            sequential: sequential,
            uniqueness: uniqueness,
            window_time: conf.window_time,
            slide_time: conf.slide_time,
            wait_time: conf.wait_time,
            time_unit: conf.time_unit.clone(),
        }
    );
}

#[derive(Debug,Clone,Deserialize)]
pub enum TimeUnit{
    Sec,
    Mic,
    Mil,
}



